/**
 * Test to make sure that the abort command interrupts a resharding operation that has not yet
 * persisted a decision.
 *
 * @tags: [uses_atclustertime]
 */
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";

const originalCollectionNs = "reshardingDb.coll";
const enterAbortFailpointName = "reshardingPauseCoordinatorBeforeStartingErrorFlow";

const nodeTypeEnum = {
    COORDINATOR: 1,
    DONOR: 2,
    RECIPIENT: 3,
    NO_EXTRA_FAILPOINTS_SENTINEL: 4
};

const abortLocationEnum = {
    BEFORE_STEADY_STATE: 1,
    BEFORE_DECISION_PERSISTED: 2,
    AFTER_DECISION_PERSISTED: 3
};

let getConnStringsFromNodeType = (nodeType, reshardingTest, topology) => {
    let connStrings = [];
    if (nodeType == nodeTypeEnum.COORDINATOR) {
        connStrings.push(topology.configsvr.nodes[0]);
    } else if (nodeType == nodeTypeEnum.DONOR) {
        for (let donor of reshardingTest.donorShardNames) {
            connStrings.push(topology.shards[donor].primary);
        }
    } else if (nodeType == nodeTypeEnum.RECIPIENT) {
        for (let recipient of reshardingTest.recipientShardNames) {
            connStrings.push(topology.shards[recipient].primary);
        }
    } else if (nodeType == nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
        //
    } else {
        throw 'unsupported node type in resharding abort test';
    }

    return connStrings;
};

let getMongosFromConnStrings = (connStrings) => {
    let mongos = [];
    for (let conn of connStrings) {
        mongos.push(new Mongo(conn));
    }
    return mongos;
};

let generateFailpoints =
    (failpointName, failpointNodeType, reshardingTest, toplogy, failpointMode = "alwaysOn") => {
        const failpointTargetConnStrings =
            getConnStringsFromNodeType(failpointNodeType, reshardingTest, toplogy);
        const failpointHosts = getMongosFromConnStrings(failpointTargetConnStrings);

        let failpoints = [];
        for (let host of failpointHosts) {
            failpoints.push(configureFailPoint(host, failpointName, {} /* data */, failpointMode));
        }

        return failpoints;
    };

let generateAbortThread = (mongosConnString, ns, expectedErrorCodes) => {
    return new Thread((mongosConnString, ns, expectedErrorCodes) => {
        const mongos = new Mongo(mongosConnString);
        if (expectedErrorCodes == ErrorCodes.OK) {
            assert.commandWorked(mongos.adminCommand({abortReshardCollection: ns}));
        } else {
            assert.commandFailedWithCode(mongos.adminCommand({abortReshardCollection: ns}),
                                         expectedErrorCodes);
        }
    }, mongosConnString, ns, expectedErrorCodes);
};

let triggerAbortAndCoordinateFailpoints = (failpointName,
                                           failpointNodeType,
                                           reshardingTest,
                                           topology,
                                           mongos,
                                           configsvr,
                                           abortThread,
                                           failpoints,
                                           executeBeforeWaitingOnFailpointsFn,
                                           executeAfterWaitingOnFailpointsFn,
                                           executeAfterAbortingFn) => {
    if (executeBeforeWaitingOnFailpointsFn) {
        jsTestLog(`Executing the before-waiting-on-failpoint function`);
        executeBeforeWaitingOnFailpointsFn(mongos, originalCollectionNs);
    }

    if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
        jsTestLog(`Wait for the failpoint ${failpointName} to be reached on all applicable nodes`);
        for (let failpoint of failpoints) {
            failpoint.wait();
        }
    }

    if (executeAfterWaitingOnFailpointsFn) {
        jsTestLog(`Executing the after-waiting-on-failpoint function`);
        executeAfterWaitingOnFailpointsFn(reshardingTest, topology, mongos, originalCollectionNs);
    }

    jsTestLog(`Wait for the coordinator to recognize that it's been aborted`);

    const enterAbortFailpoint = configureFailPoint(configsvr, enterAbortFailpointName);
    abortThread.start();
    enterAbortFailpoint.wait();

    if (executeAfterAbortingFn) {
        jsTestLog(`Executing the after-aborting function`);
        executeAfterAbortingFn(reshardingTest, topology, mongos, originalCollectionNs);
    }

    enterAbortFailpoint.off();

    if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
        jsTestLog(`Turn off the failpoint ${
            failpointName} to allow both the abort and the resharding operation to complete`);
        for (let failpoint of failpoints) {
            failpoint.off();
        }
    }
};

let triggerPostDecisionPersistedAbort = (mongos, abortThread) => {
    assert.soon(() => {
        // It's possible that after the decision has been persisted, the
        // coordinator document could be in either of the two specified states,
        // or will have been completely deleted. Await any of these conditions
        // in order to test the abort command's inability to abort after a
        // persisted decision.
        const coordinatorDoc =
            mongos.getCollection('config.reshardingOperations').findOne({ns: originalCollectionNs});
        return coordinatorDoc == null ||
            (coordinatorDoc.state === "decision-persisted" || coordinatorDoc.state === "done");
    });

    abortThread.start();
};

const runAbortWithFailpoint = (failpointName, failpointNodeType, abortLocation, {
    executeBeforeReshardingStartsFn = null,
    executeAtStartOfReshardingFn = null,
    executeBeforeWaitingOnFailpointsFn = null,
    executeAfterWaitingOnFailpointsFn = null,
    executeAfterAbortingFn = null,
} = {}) => {
    const reshardingTest =
        new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true});
    reshardingTest.setup();

    const donorShardNames = reshardingTest.donorShardNames;
    const recipientShardNames = reshardingTest.recipientShardNames;

    const sourceCollection = reshardingTest.createShardedCollection({
        ns: "reshardingDb.coll",
        shardKeyPattern: {oldKey: 1},
        chunks: [
            {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
            {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]},
        ],
    });

    const mongos = sourceCollection.getMongo();
    const topology = DiscoverTopology.findConnectedNodes(mongos);
    const configsvr = new Mongo(topology.configsvr.nodes[0]);

    let expectedAbortErrorCodes = ErrorCodes.OK;
    let expectedReshardingErrorCode = ErrorCodes.ReshardCollectionAborted;

    // If the abort is going to happen after the decision is persisted, it's expected that the
    // resharding operation will have finished without error, and that the abort itself will
    // error.
    if (abortLocation == abortLocationEnum.AFTER_DECISION_PERSISTED) {
        expectedAbortErrorCodes =
            [ErrorCodes.ReshardCollectionCommitted, ErrorCodes.NoSuchReshardCollection];
        expectedReshardingErrorCode = ErrorCodes.OK;
    }

    const abortThread = generateAbortThread(
        topology.mongos.nodes[0], originalCollectionNs, expectedAbortErrorCodes);

    if (executeBeforeReshardingStartsFn) {
        jsTestLog(`Executing the before-resharding-starts fn`);
        executeBeforeReshardingStartsFn(reshardingTest, topology, mongos, originalCollectionNs);
    }

    let failpoints = [];
    if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
        failpoints = generateFailpoints(failpointName, failpointNodeType, reshardingTest, topology);
    }

    reshardingTest.withReshardingInBackground(
        {
            newShardKeyPattern: {newKey: 1},
            newChunks: [
                {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]},
                {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]},
            ],
        },
        () => {
            if (executeAtStartOfReshardingFn) {
                jsTestLog(`Executing the start-of-resharding fn`);
                executeAtStartOfReshardingFn(
                    reshardingTest, topology, mongos, originalCollectionNs);
            }

            if (abortLocation == abortLocationEnum.BEFORE_STEADY_STATE) {
                triggerAbortAndCoordinateFailpoints(failpointName,
                                                    failpointNodeType,
                                                    reshardingTest,
                                                    topology,
                                                    mongos,
                                                    configsvr,
                                                    abortThread,
                                                    failpoints,
                                                    executeBeforeWaitingOnFailpointsFn,
                                                    executeAfterWaitingOnFailpointsFn,
                                                    executeAfterAbortingFn);
            }
        },
        {
            expectedErrorCode: expectedReshardingErrorCode,
            postCheckConsistencyFn: () => {
                if (abortLocation == abortLocationEnum.BEFORE_DECISION_PERSISTED) {
                    triggerAbortAndCoordinateFailpoints(failpointName,
                                                        failpointNodeType,
                                                        reshardingTest,
                                                        topology,
                                                        mongos,
                                                        configsvr,
                                                        abortThread,
                                                        failpoints,
                                                        executeBeforeWaitingOnFailpointsFn,
                                                        executeAfterWaitingOnFailpointsFn,
                                                        executeAfterAbortingFn);
                }
            },
            postDecisionPersistedFn: () => {
                if (abortLocation == abortLocationEnum.AFTER_DECISION_PERSISTED) {
                    triggerPostDecisionPersistedAbort(mongos, abortThread);
                }
            }
        });

    const reshardingMetrics =
        configsvr.getDB('admin').serverStatus({}).shardingStatistics.resharding;

    let reshardingOperationsFinalCount = reshardingMetrics.countStarted;
    let reshardingSuccessesFinalCount = reshardingMetrics.countSucceeded;
    let reshardingCanceledFinalCount = reshardingMetrics.countCanceled;

    assert.eq(reshardingOperationsFinalCount, 1);

    if (expectedReshardingErrorCode == ErrorCodes.OK) {
        assert.eq(reshardingSuccessesFinalCount, 1);
        assert.eq(reshardingCanceledFinalCount, 0);
    } else if (expectedAbortErrorCodes == ErrorCodes.OK) {
        assert.eq(reshardingCanceledFinalCount, 1);
        assert.eq(reshardingSuccessesFinalCount, 0);
    }

    abortThread.join();
    reshardingTest.teardown();
};

runAbortWithFailpoint("reshardingPauseRecipientBeforeCloning",
                      nodeTypeEnum.RECIPIENT,
                      abortLocationEnum.BEFORE_STEADY_STATE);

runAbortWithFailpoint("reshardingPauseRecipientDuringCloning",
                      nodeTypeEnum.RECIPIENT,
                      abortLocationEnum.BEFORE_STEADY_STATE);

runAbortWithFailpoint(
    "reshardingPauseRecipientDuringOplogApplication",
    nodeTypeEnum.RECIPIENT,
    abortLocationEnum.BEFORE_STEADY_STATE,
    {
        executeAfterWaitingOnFailpointsFn: (reshardingTest, topology, mongos, ns) => {
            assert.commandWorked(mongos.getCollection(ns).insert([
                {_id: 0, oldKey: -10, newKey: -10},
                {_id: 1, oldKey: 10, newKey: -10},
                {_id: 2, oldKey: -10, newKey: 10},
                {_id: 3, oldKey: 10, newKey: 10},
            ]));
        },
    });

function waitForAllRecipientsToReachApplying(mongos, ns) {
    assert.soon(() => {
        const coordinatorDoc =
            mongos.getCollection('config.reshardingOperations').findOne({ns: ns});

        if (coordinatorDoc === null || !Array.isArray(coordinatorDoc.recipientShards) ||
            coordinatorDoc.recipientShards.length === 0) {
            return false;
        }

        for (const shardEntry of coordinatorDoc.recipientShards) {
            if (shardEntry.mutableState.state !== "applying") {
                return false;
            }
        }

        return true;
    });
}

// Rely on the resharding_test_fixture's built-in failpoint that hangs before switching to
// the blocking writes state.
runAbortWithFailpoint(
    null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.BEFORE_STEADY_STATE, {
        executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => {
            waitForAllRecipientsToReachApplying(mongos, ns);
        },
    });

// Test that the resharding operation can successfully be aborted even when the commit monitor won't
// ever signal to the coordinator the resharding operation is ready to commit.
runAbortWithFailpoint("hangBeforeQueryingRecipients",
                      nodeTypeEnum.COORDINATOR,
                      abortLocationEnum.BEFORE_STEADY_STATE,
                      {
                          executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => {
                              waitForAllRecipientsToReachApplying(mongos, ns);
                          },
                      });

runAbortWithFailpoint(
    null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.AFTER_DECISION_PERSISTED);

// The resharding test fixture uses its own set of coordinator failpoints for resharding
// checkpoints. It may not be possible to insert documents once the second checkpoint is reached.
// Because of this, we cannot rely on the failpoint mechanism set up in this test file. Instead, we
// must manually activate and unactivate the failpoints across a checkpoint threshold.
//
// executeAtStartOfReshardingFn runs while the coordinator is in steady state (checkpoint 1), and
// executeAfterWaitingOnFailpointsFn will run while the coordinator is blocking writes (checkpoint
// 2).

let recipientFailpoints = [];
runAbortWithFailpoint(
    null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.BEFORE_DECISION_PERSISTED, {
        executeBeforeReshardingStartsFn: (reshardingTest, topology) => {
            recipientFailpoints =
                generateFailpoints("reshardingPauseRecipientDuringOplogApplication",
                                   nodeTypeEnum.RECIPIENT,
                                   reshardingTest,
                                   topology);
        },
        executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => {
            for (let failpoint of recipientFailpoints) {
                failpoint.wait();
            }
            assert.commandWorked(mongos.getCollection(ns).insert([
                {_id: 4, oldKey: -10, newKey: -10},
                {_id: 5, oldKey: 10, newKey: -10},
                {_id: 6, oldKey: -10, newKey: 10},
                {_id: 7, oldKey: 10, newKey: 10},
            ]));
        },
        executeAfterWaitingOnFailpointsFn: (reshardingTest, topology, mongos, ns) => {
            assert.soon(() => {
                for (let donor of reshardingTest.donorShardNames) {
                    const donorConn = new Mongo(topology.shards[donor].primary);
                    const donorDoc =
                        donorConn.getCollection('config.localReshardingOperations.donor').findOne({
                            ns: ns
                        });
                    return donorDoc != null && donorDoc.mutableState.state === "blocking-writes";
                }
            });
        },
        executeAfterAbortingFn: () => {
            for (let failpoint of recipientFailpoints) {
                failpoint.off();
            }
        }
    });
